package com.atguigu.flink.sql.join;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 *
 *  维表关联。
 *      使用流去读取事实，做成动态表。
 *      维度数据存储在外部的数据库（Mysql,HBase,postgresql）中。每来一条事实数据，读取外部的数据库进行关联。
 *
 */
public class Demo3_LoopUpJoin
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        Schema schema = Schema.newBuilder()
                              .column("id", "STRING")
                              .column("ts", "BIGINT")
                              .column("vc", "INT")
                              //lookupjoin强制要求事实表有处理时间
                              .columnByExpression("pt", "proctime()")
                              .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts,3)")
                              .watermark("et","et - INTERVAL '0.001' SECOND")
                              .build();

        //事实表
        tableEnvironment.createTemporaryView("t1",ds1,schema);

        //定义维度表
        String createTableSql = " create table t2 ( id STRING, name STRING ) with (" +
            " 'connector' = 'jdbc'," +
            "  'url' = 'jdbc:mysql://hadoop102:3306/Mybatis?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8'," +
            "  'username' = 'root'," +
            "  'password' = '000000'," +
            "  'driver' = 'com.mysql.cj.jdbc.Driver'," +
            "  'table-name' = 'ws2'" +
            ")";
        tableEnvironment.executeSql(createTableSql);

        //关联
        String sql = " select t1.id,t1.vc,t2.name from t1 left join t2 " +
                        " for SYSTEM_TIME as of t1.pt" +
                        "  on t1.id = t2.id";

        tableEnvironment.sqlQuery(sql).execute().print();

    }
}
